package com.ycl.javacore.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;

import javax.jms.*;

public class TopicSubscriber {

    /**
     * 默认用户名
     */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默认密码
     */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默认连接地址
     */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);

        //可以设置每次预取的值，如果值是0，则变为pull模式，> 0 ，则表示push模式
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setTopicPrefetch(0);
        ((ActiveMQConnectionFactory) connectionFactory).setPrefetchPolicy(prefetchPolicy);

        try {
            //创建连接
            Connection connection = connectionFactory.createConnection();
//            connection.setClientID("bb");
            //开启连接
            connection.start();
            //创建会话，不需要事务
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建 Topic
            Topic myTestTopic = session.createTopic("activemq-topic-test1");

//            MessageConsumer messageConsumer = session.createConsumer(myTestTopic);
//            messageConsumer.setMessageListener(new MessageListener() {
//                @Override
//                public void onMessage(Message message) {
//                    try {
//                        System.out.println("消费者1 接收到消息：" + ((TextMessage) message).getText());
//                    } catch (JMSException e) {
//                        e.printStackTrace();
//                    }
//                }
//            });
//
//            MessageConsumer messageConsumer2 = session.createConsumer(myTestTopic);
//            messageConsumer2.setMessageListener(new MessageListener() {
//                @Override
//                public void onMessage(Message message) {
//                    try {
//                        System.out.println("消费者2 接收到消息：" + ((TextMessage) message).getText());
//                    } catch (JMSException e) {
//                        e.printStackTrace();
//                    }
//                }
//            });

//            MessageConsumer messageConsumer3 = session.createDurableSubscriber(myTestTopic,"bb");
            MessageConsumer messageConsumer3 = session.createConsumer(myTestTopic);

            //同步接受消息，prefetch>0则是PUSH模式，prefetch=0则是拉模式
//            TextMessage message = (TextMessage) messageConsumer3.receive(100000);

            //异步接受消息，肯定是PUSH模式
            //如果是异步方式，必须要prefetch>0，prefetch>0才是PUSH模式
            //Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1
            messageConsumer3.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消费者3 接收到消息：" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

//            Destination destination = session.createQueue("first-queue");
//            MessageConsumer queueConsumer = session.createConsumer(destination);
//            queueConsumer.setMessageListener(message -> {
//                try {
//                    System.out.println(((TextMessage) message).getText());
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            });



            //让主线程休眠100秒，使消息消费者对象能继续存活一段时间从而能监听到消息
            Thread.sleep(100 * 1000);
            //关闭资源
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
